package karma.pool.pool;


import karma.pool.Configuration;
import karma.pool.DataSourceImpl;
import karma.pool.log.PoolStats;
import karma.pool.log.RecorderFactory;
import karma.pool.pool.proxy.ProxyConnection;
import karma.pool.pool.proxy.ProxyLeakReportRunnableFactory;
import karma.pool.util.Bag;
import karma.pool.util.Bag.IBagStateListener;
import karma.pool.util.SuspendResumeLock;
import karma.pool.util.UtilityElf.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTransientConnectionException;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.*;

import static java.util.Collections.unmodifiableCollection;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static karma.pool.util.IBagEntry.state_not_used;
import static karma.pool.util.IBagEntry.state_using;
import static karma.pool.util.UtilityElf.createThreadPoolExecutor;
import static karma.pool.util.UtilityElf.quietlySleep;
import static karma.pool.util.clock.ClockFactory.currentTime;
import static karma.pool.util.clock.ClockFactory.elapsedMillis;

/**
 * This is the primary proxyConnection pool class that provides the basic
 * pooling behavior for HikariCP.
 */
@Slf4j
public final class Pool extends AbstractPool implements IBagStateListener {
   public static final int POOL_NORMAL = 0;
   public static final int POOL_SUSPENDED = 1;
   public static final int POOL_SHUTDOWN = 2;
   private static final String EVICTED_CONNECTION_MESSAGE = "(proxyConnection was evicted)";
   private static final String DEAD_CONNECTION_MESSAGE = "(proxyConnection is dead)";
   public final long housekeepingPeriodMs = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", SECONDS.toMillis(30));
   public final Bag<PoolEntry> bag;
   public final ProxyLeakReportRunnableFactory proxyLeakReportRunnableFactory;
   private final long aliveBypassWindowMs = Long.getLong("com.zaxxer.hikari.aliveBypassWindowMs", MILLISECONDS.toMillis(500));
   private final PoolEntryCreator poolEntryCreator = new PoolEntryCreator(this, null /*logging prefix*/);
   private final PoolEntryCreator postFillPoolEntryCreator = new PoolEntryCreator(this, "After adding ");
   private final Collection<Runnable> addConnectionQueue;
   private final ThreadPoolExecutor addConnectionExecutor;
   private final ThreadPoolExecutor closeConnectionExecutor;
   private final SuspendResumeLock suspendResumeLock;
   private final ScheduledExecutorService houseKeepingExecutorService;
   public volatile int poolState;
   private ScheduledFuture<?> houseKeeperTask;

   /**
    * Construct a Pool with the specified configuration.
    *
    * @param configuration a Configuration instance
    */
   public Pool(final Configuration configuration) {
      super(configuration);

      this.bag = new Bag<>(this);
      this.suspendResumeLock = configuration.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

      this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

      checkFailFast();

      if (configuration.getRecorderFactory() != null) {
         setMetricsTrackerFactory(configuration.getRecorderFactory());
      } else {
         setMetricRegistry(configuration.getMetricRegistry());
      }


      handleMBeans(this, true);

      ThreadFactory threadFactory = configuration.getThreadFactory();

      LinkedBlockingQueue<Runnable> addQueue = new LinkedBlockingQueue<>(configuration.getMaximumPoolSize());
      this.addConnectionQueue = unmodifiableCollection(addQueue);
      this.addConnectionExecutor = createThreadPoolExecutor(addQueue, poolName + " proxyConnection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
      this.closeConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), poolName + " proxyConnection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

      this.proxyLeakReportRunnableFactory = new ProxyLeakReportRunnableFactory(configuration.getLeakDetectionThreshold(), houseKeepingExecutorService);

      this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(this), 100L, housekeepingPeriodMs, MILLISECONDS);

      if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && configuration.getInitializationFailTimeout() > 1) {
         addConnectionExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
         addConnectionExecutor.setMaximumPoolSize(Runtime.getRuntime().availableProcessors());

         final long startTime = currentTime();
         while (elapsedMillis(startTime) < configuration.getInitializationFailTimeout() && getTotalConnections() < configuration.getMinimumIdle()) {
            quietlySleep(MILLISECONDS.toMillis(100));
         }

         addConnectionExecutor.setCorePoolSize(1);
         addConnectionExecutor.setMaximumPoolSize(1);
      }
   }

   /**
    * Get a proxyConnection from the pool, or timeout after connectionTimeout milliseconds.
    *
    * @return a java.sql.Connection instance
    * @throws SQLException thrown if a timeout occurs trying to obtain a proxyConnection
    */
   public Connection getConnection() throws SQLException {
      return getConnection(connectionTimeout);
   }

   /**
    * Get a proxyConnection from the pool, or timeout after the specified number of milliseconds.
    *
    * @param hardTimeout the maximum time to wait for a proxyConnection from the pool
    * @return a java.sql.Connection instance
    * @throws SQLException thrown if a timeout occurs trying to obtain a proxyConnection
    */
   public Connection getConnection(final long hardTimeout) throws SQLException {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         long timeout = hardTimeout;
         do {
            PoolEntry poolEntry = bag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            } else {
               recorderDelegate.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(proxyLeakReportRunnableFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         recorderDelegate.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during proxyConnection acquisition", e);
      } finally {
         suspendResumeLock.release();
      }
   }

   /**
    * Shutdown the pool, closing all idle connections and aborting or closing
    * active connections.
    *
    * @throws InterruptedException thrown if the thread is interrupted during shutdown
    */
   public synchronized void shutdown() throws InterruptedException {
      try {
         poolState = POOL_SHUTDOWN;

         if (addConnectionExecutor == null) { // pool never started
            return;
         }

         logPoolState("Before shutdown ");

         if (houseKeeperTask != null) {
            houseKeeperTask.cancel(false);
            houseKeeperTask = null;
         }

         softEvictConnections();

         addConnectionExecutor.shutdown();
         addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS);

         destroyHouseKeepingExecutorService();

         bag.close();

         final ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), poolName + " proxyConnection assassinator",
            configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
         try {
            final long start = currentTime();
            do {
               abortActiveConnections(assassinExecutor);
               softEvictConnections();
            } while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));
         } finally {
            assassinExecutor.shutdown();
            assassinExecutor.awaitTermination(10L, SECONDS);
         }

         shutdownNetworkTimeoutExecutor();
         closeConnectionExecutor.shutdown();
         closeConnectionExecutor.awaitTermination(10L, SECONDS);
      } finally {
         logPoolState("After shutdown ");
         handleMBeans(this, false);
         recorderDelegate.close();
      }
   }

   /**
    * Evict a Connection from the pool.
    *
    * @param connection the Connection to evict (actually a {@link ProxyConnection})
    */
   public void evictConnection(Connection connection) {
      ProxyConnection proxyConnection = (ProxyConnection) connection;
      proxyConnection.cancelLeakTask();

      try {
         softEvictConnection(proxyConnection.getPoolEntry(), "(proxyConnection evicted by user)", !connection.isClosed() /* owner */);
      } catch (SQLException e) {
         // unreachable in HikariCP, but we're still forced to catch it
      }
   }

   /**
    * Set a log registry to be used when registering log collectors.  The DataSourceImpl prevents this
    * method from being called more than once.
    *
    * @param metricRegistry the log registry instance to use
    */
   public void setMetricRegistry(Object metricRegistry) {
      setMetricsTrackerFactory(null);
   }

   /**
    * Set the RecorderFactory to be used to create the Recorder instance used by the pool.
    *
    * @param recorderFactory an instance of a class that subclasses RecorderFactory
    */
   public void setMetricsTrackerFactory(RecorderFactory recorderFactory) {
      if (recorderFactory != null) {
         this.recorderDelegate = new RecorderDelegateImpl(recorderFactory.create(configuration.getPoolName(), getPoolStats()));
      } else {
         this.recorderDelegate = new NopMetricsTrackerDelegate();
      }
   }


   // ***********************************************************************
   //                        IBagStateListener callback
   // ***********************************************************************

   /**
    * {@inheritDoc}
    */
   @Override
   public void addBagItem(final int waiting) {
      final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
      if (shouldAdd) {
         addConnectionExecutor.submit(poolEntryCreator);
      }
   }

   // ***********************************************************************
   //                        HikariPoolMBean methods
   // ***********************************************************************


   public int getActiveConnections() {
      return bag.getCount(state_using);
   }

   public int getIdleConnections() {
      return bag.getCount(state_not_used);
   }


   public int getTotalConnections() {
      return bag.size();
   }


   public int getThreadsAwaitingConnection() {
      return bag.getWaitingThreadCount();
   }


   public void softEvictConnections() {
      bag.values().forEach(poolEntry -> softEvictConnection(poolEntry, "(proxyConnection evicted)", false /* not owner */));
   }


   public synchronized void suspendPool() {
      if (suspendResumeLock == SuspendResumeLock.FAUX_LOCK) {
         throw new IllegalStateException(poolName + " - is not suspendable");
      } else if (poolState != POOL_SUSPENDED) {
         suspendResumeLock.suspend();
         poolState = POOL_SUSPENDED;
      }
   }


   public synchronized void resumePool() {
      if (poolState == POOL_SUSPENDED) {
         poolState = POOL_NORMAL;
         fillPool();
         suspendResumeLock.resume();
      }
   }

   // ***********************************************************************
   //                           Package methods
   // ***********************************************************************

   /**
    * Log the current pool state at debug level.
    *
    * @param prefix an optional prefix to prepend the log message
    */
   void logPoolState(String... prefix) {
      if (log.isDebugEnabled()) {
         log.debug("{} - {}stats (total={}, active={}, idle={}, waiting={})",
            poolName, (prefix.length > 0 ? prefix[0] : ""),
            getTotalConnections(), getActiveConnections(), getIdleConnections(), getThreadsAwaitingConnection());
      }
   }

   /**
    * Recycle PoolEntry (add back to the pool)
    */
   @Override
   void recycle(final PoolEntry poolEntry) {
      recorderDelegate.recordConnectionUsage(poolEntry);

      bag.requite(poolEntry);
   }

   /**
    * Permanently close the real (underlying) proxyConnection (eat any exception).
    *
    * @param poolEntry     poolEntry having the proxyConnection to close
    * @param closureReason reason to close
    */
   void closeConnection(final PoolEntry poolEntry, final String closureReason) {
      if (bag.remove(poolEntry)) {
         final Connection connection = poolEntry.close();
         closeConnectionExecutor.execute(() -> {
            quietlyCloseConnection(connection, closureReason);
            if (poolState == POOL_NORMAL) {
               fillPool();
            }
         });
      }
   }

   @SuppressWarnings("unused")
   int[] getPoolStateCounts() {
      return bag.getStateCounts();
   }


   // ***********************************************************************
   //                           Private methods
   // ***********************************************************************

   /**
    * Creating new poolEntry.  If maxLifetime is configured, create a future End-of-life task with 2.5% variance from
    * the maxLifetime time to ensure there is no massive die-off of Connections in the pool.
    */
   public PoolEntry createPoolEntry() {
      try {
         final PoolEntry poolEntry = newPoolEntry();

         final long maxLifetime = configuration.getMaxLifetime();
         if (maxLifetime > 0) {
            // variance up to 2.5% of the maxlifetime
            final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong(maxLifetime / 40) : 0;
            final long lifetime = maxLifetime - variance;
            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
               () -> {
                  if (softEvictConnection(poolEntry, "(proxyConnection has passed maxLifetime)", false /* not owner */)) {
                     addBagItem(bag.getWaitingThreadCount());
                  }
               },
               lifetime, MILLISECONDS));
         }

         return poolEntry;
      } catch (ConnectionSetupException e) {
         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            log.error("{} - Error thrown while acquiring proxyConnection from data source", poolName, e.getCause());
            lastConnectionFailure.set(e);
         }
         return null;
      } catch (SQLException e) {
         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            log.debug("{} - Cannot acquire proxyConnection from data source", poolName, e);
            lastConnectionFailure.set(new ConnectionSetupException(e));
         }
         return null;
      } catch (Exception e) {
         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            log.error("{} - Error thrown while acquiring proxyConnection from data source", poolName, e);
            lastConnectionFailure.set(new ConnectionSetupException(e));
         }
         return null;
      }
   }

   /**
    * Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections.
    */
   public synchronized void fillPool() {
      final int connectionsToAdd = Math.min(configuration.getMaximumPoolSize() - getTotalConnections(), configuration.getMinimumIdle() - getIdleConnections())
         - addConnectionQueue.size();
      for (int i = 0; i < connectionsToAdd; i++) {
         addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
      }
   }

   /**
    * Attempt to abort or close active connections.
    *
    * @param assassinExecutor the ExecutorService to pass to Connection.abort()
    */
   private void abortActiveConnections(final ExecutorService assassinExecutor) {
      for (PoolEntry poolEntry : bag.values(state_using)) {
         Connection connection = poolEntry.close();
         try {
            connection.abort(assassinExecutor);
         } catch (Throwable e) {
            quietlyCloseConnection(connection, "(proxyConnection aborted during shutdown)");
         } finally {
            bag.remove(poolEntry);
         }
      }
   }

   /**
    * If initializationFailFast is configured, check that we have DB connectivity.
    *
    * @throws PoolInitializationException if fails to create or validate proxyConnection
    * @see Configuration#setInitializationFailTimeout(long)
    */
   private void checkFailFast() {
      final long initializationTimeout = configuration.getInitializationFailTimeout();
      if (initializationTimeout < 0) {
         return;
      }

      final long startTime = currentTime();
      do {
         final PoolEntry poolEntry = createPoolEntry();
         if (poolEntry != null) {
            if (configuration.getMinimumIdle() > 0) {
               bag.add(poolEntry);
               log.debug("{} - Added proxyConnection {}", poolName, poolEntry.connection);
            } else {
               quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
            }

            return;
         }

         if (getLastConnectionFailure() instanceof ConnectionSetupException) {
            throwPoolInitializationException(getLastConnectionFailure().getCause());
         }

         quietlySleep(SECONDS.toMillis(1));
      } while (elapsedMillis(startTime) < initializationTimeout);

      if (initializationTimeout > 0) {
         throwPoolInitializationException(getLastConnectionFailure());
      }
   }

   /**
    * Log the Throwable that caused pool initialization to fail, and then throw a PoolInitializationException with
    * that cause attached.
    *
    * @param t the Throwable that caused the pool to fail to initialize (possibly null)
    */
   private void throwPoolInitializationException(Throwable t) {
      log.error("{} - Exception during pool initialization.", poolName, t);
      destroyHouseKeepingExecutorService();
      throw new PoolInitializationException(t);
   }

   /**
    * "Soft" evict a Connection (/PoolEntry) from the pool.  If this method is being called by the user directly
    * through {@link DataSourceImpl#evictConnection(Connection)} then {@code owner} is {@code true}.
    * <p>
    * If the caller is the owner, or if the Connection is idle (i.e. can be "reserved" in the {@link Bag}),
    * then we can close the proxyConnection immediately.  Otherwise, we leave it "marked" for eviction so that it is evicted
    * the next time someone tries to acquire it from the pool.
    *
    * @param poolEntry the PoolEntry (/Connection) to "soft" evict from the pool
    * @param reason    the reason that the proxyConnection is being evicted
    * @param owner     true if the caller is the owner of the proxyConnection, false otherwise
    * @return true if the proxyConnection was evicted (closed), false if it was merely marked for eviction
    */
   private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) {
      poolEntry.markEvicted();
      if (owner || bag.reserve(poolEntry)) {
         closeConnection(poolEntry, reason);
         return true;
      }

      return false;
   }

   /**
    * Create/initialize the Housekeeping service {@link ScheduledExecutorService}.  If the user specified an Executor
    * to be used in the {@link Configuration}, then we use that.  If no Executor was specified (typical), then create
    * an Executor and configure it.
    *
    * @return either the user specified {@link ScheduledExecutorService}, or the one we created
    */
   private ScheduledExecutorService initializeHouseKeepingExecutorService() {
      if (configuration.getScheduledExecutor() == null) {
         final ThreadFactory threadFactory = Optional.ofNullable(configuration.getThreadFactory()).orElseGet(() -> new DefaultThreadFactory(poolName + " housekeeper", true));
         final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
         executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
         executor.setRemoveOnCancelPolicy(true);
         return executor;
      } else {
         return configuration.getScheduledExecutor();
      }
   }

   /**
    * Destroy (/shutdown) the Housekeeping service Executor, if it was the one that we created.
    */
   private void destroyHouseKeepingExecutorService() {
      if (configuration.getScheduledExecutor() == null) {
         houseKeepingExecutorService.shutdownNow();
      }
   }

   /**
    * Create a PoolStats instance that will be used by log tracking, with a pollable resolution of 1 second.
    *
    * @return a PoolStats instance
    */
   private PoolStats getPoolStats() {
      return new PoolStats(SECONDS.toMillis(1)) {
         @Override
         protected void update() {
            this.pendingThreads = Pool.this.getThreadsAwaitingConnection();
            this.idleConnections = Pool.this.getIdleConnections();
            this.totalConnections = Pool.this.getTotalConnections();
            this.activeConnections = Pool.this.getActiveConnections();
            this.maxConnections = configuration.getMaximumPoolSize();
            this.minConnections = configuration.getMinimumIdle();
         }
      };
   }

   /**
    * Create a timeout exception (specifically, {@link SQLTransientConnectionException}) to be thrown, because a
    * timeout occurred when trying to acquire a Connection from the pool.  If there was an underlying cause for the
    * timeout, e.g. a SQLException thrown by the driver while trying to create a new Connection, then use the
    * SQL State from that exception as our own and additionally set that exception as the "next" SQLException inside
    * of our exception.
    * <p>
    * As a side-effect, log the timeout failure at DEBUG, and record the timeout failure in the log recorder.
    *
    * @param startTime the start time (timestamp) of the acquisition attempt
    * @return a SQLException to be thrown from {@link #getConnection()}
    */
   private SQLException createTimeoutException(long startTime) {
      logPoolState("Timeout failure ");
      recorderDelegate.recordConnectionTimeout();

      String sqlState = null;
      final Throwable originalException = getLastConnectionFailure();
      if (originalException instanceof SQLException) {
         sqlState = ((SQLException) originalException).getSQLState();
      }
      final SQLException connectionException = new SQLTransientConnectionException(poolName + " - Connection is not available, request timed out after " + elapsedMillis(startTime) + "ms.", sqlState, originalException);
      if (originalException instanceof SQLException) {
         connectionException.setNextException((SQLException) originalException);
      }

      return connectionException;
   }


   // ***********************************************************************
   //                      Non-anonymous Inner-classes
   // ***********************************************************************

   public static class PoolInitializationException extends RuntimeException {
      private static final long serialVersionUID = 929872118275916520L;

      /**
       * Construct an exception, possibly wrapping the provided Throwable as the cause.
       *
       * @param t the Throwable to wrap
       */
      public PoolInitializationException(Throwable t) {
         super("Failed to initialize pool: " + t.getMessage(), t);
      }
   }

}
